1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package java.util.stream;
  24 
  25 import java.util.Collections;
  26 import java.util.EnumSet;
  27 import java.util.Iterator;
  28 import java.util.Set;
  29 import java.util.Spliterator;
  30 import java.util.function.Consumer;
  31 import java.util.function.Function;
  32 
  33 /**
  34  * Test scenarios for reference streams.
  35  *
  36  * Each scenario is provided with a data source, a function that maps a fresh
  37  * stream (as provided by the data source) to a new stream, and a sink to
  38  * receive results.  Each scenario describes a different way of computing the
  39  * stream contents.  The test driver will ensure that all scenarios produce
  40  * the same output (modulo allowable differences in ordering).
  41  */
  42 @SuppressWarnings({"rawtypes", "unchecked"})
  43 public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
  44 
  45     STREAM_FOR_EACH(false) {
  46         <T, U, S_IN extends BaseStream<T, S_IN>>
  47         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  48             Stream<U> s = m.apply(source);
  49             if (s.isParallel()) {
  50                 s = s.sequential();
  51             }
  52             s.forEach(b);
  53         }
  54     },
  55 
  56     // Collec to list
  57     STREAM_COLLECT(false) {
  58         <T, U, S_IN extends BaseStream<T, S_IN>>
  59         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  60             for (U t : m.apply(source).collect(Collectors.toList())) {
  61                 b.accept(t);
  62             }
  63         }
  64     },
  65 
  66     // To array
  67     STREAM_TO_ARRAY(false) {
  68         <T, U, S_IN extends BaseStream<T, S_IN>>
  69         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  70             for (Object t : m.apply(source).toArray()) {
  71                 b.accept((U) t);
  72             }
  73         }
  74     },
  75 
  76     // Wrap as stream, and iterate in pull mode
  77     STREAM_ITERATOR(false) {
  78         <T, U, S_IN extends BaseStream<T, S_IN>>
  79         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  80             for (Iterator<U> seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
  81                 b.accept(seqIter.next());
  82         }
  83     },
  84 
  85     // Wrap as stream, and spliterate then iterate in pull mode
  86     STREAM_SPLITERATOR(false) {
  87         <T, U, S_IN extends BaseStream<T, S_IN>>
  88         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  89             for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
  90             }
  91         }
  92     },
  93 
  94     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
  95     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
  96         <T, U, S_IN extends BaseStream<T, S_IN>>
  97         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
  98             SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
  99         }
 100     },
 101 
 102     // Wrap as stream, and spliterate then iterate in pull mode
 103     STREAM_SPLITERATOR_FOREACH(false) {
 104         <T, U, S_IN extends BaseStream<T, S_IN>>
 105         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 106             m.apply(source).spliterator().forEachRemaining(b);
 107         }
 108     },
 109 
 110     // Wrap as parallel stream + sequential
 111     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
 112         <T, U, S_IN extends BaseStream<T, S_IN>>
 113         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 114             m.apply(source).sequential().forEach(b);
 115         }
 116     },
 117 
 118     // Wrap as parallel stream + forEachOrdered
 119     PAR_STREAM_FOR_EACH_ORDERED(true) {
 120         <T, U, S_IN extends BaseStream<T, S_IN>>
 121         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 122             // @@@ Want to explicitly select ordered equalator
 123             m.apply(source).forEachOrdered(b);
 124         }
 125     },
 126 
 127     // Wrap as stream, and spliterate then iterate sequentially
 128     PAR_STREAM_SPLITERATOR(true) {
 129         <T, U, S_IN extends BaseStream<T, S_IN>>
 130         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 131             for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
 132             }
 133         }
 134     },
 135 
 136     // Wrap as stream, and spliterate then iterate sequentially
 137     PAR_STREAM_SPLITERATOR_FOREACH(true) {
 138         <T, U, S_IN extends BaseStream<T, S_IN>>
 139         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 140             m.apply(source).spliterator().forEachRemaining(b);
 141         }
 142     },
 143 
 144     // Wrap as parallel stream + toArray
 145     PAR_STREAM_TO_ARRAY(true) {
 146         <T, U, S_IN extends BaseStream<T, S_IN>>
 147         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 148             for (Object t : m.apply(source).toArray())
 149                 b.accept((U) t);
 150         }
 151     },
 152 
 153     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
 154     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
 155         <T, U, S_IN extends BaseStream<T, S_IN>>
 156         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 157             Stream<U> s = m.apply(source);
 158             Spliterator<U> sp = s.spliterator();
 159             Stream<U> ss = StreamSupport.stream(() -> sp,
 160                                                 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
 161                                                 | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true);
 162             for (Object t : ss.toArray())
 163                 b.accept((U) t);
 164         }
 165     },
 166 
 167     // Wrap as parallel stream + toArray and clear SIZED flag
 168     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
 169         <T, U, S_IN extends BaseStream<T, S_IN>>
 170         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 171             S_IN pipe1 = (S_IN) OpTestCase.chain(source,
 172                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
 173             Stream<U> pipe2 = m.apply(pipe1);
 174 
 175             for (Object t : pipe2.toArray())
 176                 b.accept((U) t);
 177         }
 178     },
 179 
 180     // Wrap as parallel + collect to list
 181     PAR_STREAM_COLLECT_TO_LIST(true) {
 182         <T, U, S_IN extends BaseStream<T, S_IN>>
 183         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 184             for (U u : m.apply(source).collect(Collectors.toList()))
 185                 b.accept(u);
 186         }
 187     },
 188 
 189     // Wrap sequential as parallel, + collect to list
 190     STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
 191         public <T, S_IN extends BaseStream<T, S_IN>>
 192         S_IN getStream(TestData<T, S_IN> data) {
 193             return data.stream().parallel();
 194         }
 195 
 196         <T, U, S_IN extends BaseStream<T, S_IN>>
 197         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 198             for (U u : m.apply(source).collect(Collectors.toList()))
 199                 b.accept(u);
 200         }
 201     },
 202 
 203     // Wrap parallel as sequential,, + collect
 204     PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
 205         <T, U, S_IN extends BaseStream<T, S_IN>>
 206         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 207             for (U u : m.apply(source).collect(Collectors.toList()))
 208                 b.accept(u);
 209         }
 210     },
 211 
 212     // Wrap as parallel stream + forEach synchronizing
 213     PAR_STREAM_FOR_EACH(true, false) {
 214         <T, U, S_IN extends BaseStream<T, S_IN>>
 215         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 216             m.apply(source).forEach(e -> {
 217                 synchronized (data) {
 218                     b.accept(e);
 219                 }
 220             });
 221         }
 222     },
 223 
 224     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
 225     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
 226         <T, U, S_IN extends BaseStream<T, S_IN>>
 227         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
 228             S_IN pipe1 = (S_IN) OpTestCase.chain(source,
 229                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
 230             m.apply(pipe1).forEach(e -> {
 231                 synchronized (data) {
 232                     b.accept(e);
 233                 }
 234             });
 235         }
 236     },
 237     ;
 238 
 239     // The set of scenarios that clean the SIZED flag
 240     public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
 241             EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
 242 
 243     private final boolean isParallel;
 244 
 245     private final boolean isOrdered;
 246 
 247     StreamTestScenario(boolean isParallel) {
 248         this(isParallel, true);
 249     }
 250 
 251     StreamTestScenario(boolean isParallel, boolean isOrdered) {
 252         this.isParallel = isParallel;
 253         this.isOrdered = isOrdered;
 254     }
 255 
 256     public StreamShape getShape() {
 257         return StreamShape.REFERENCE;
 258     }
 259 
 260     public boolean isParallel() {
 261         return isParallel;
 262     }
 263 
 264     public boolean isOrdered() {
 265         return isOrdered;
 266     }
 267 
 268     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
 269     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
 270         try (S_IN source = getStream(data)) {
 271             run(data, source, b, (Function<S_IN, Stream<U>>) m);
 272         }
 273     }
 274 
 275     abstract <T, U, S_IN extends BaseStream<T, S_IN>>
 276     void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m);
 277 
 278 }