131
132 @Override
133 public void combine(ReducingSink other) {
134 if (!other.empty)
135 accept(other.state);
136 }
137 }
138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139 @Override
140 public ReducingSink makeSink() {
141 return new ReducingSink();
142 }
143 };
144 }
145
146 /**
147 * Constructs a {@code TerminalOp} that implements a mutable reduce on
148 * reference values.
149 *
150 * @param <T> the type of the input elements
151 * @param <R> the type of the result
152 * @param collector a {@code Collector} defining the reduction
153 * @return a {@code ReduceOp} implementing the reduction
154 */
155 public static <T,R> TerminalOp<T, R>
156 makeRef(Collector<? super T,R> collector) {
157 Supplier<R> supplier = Objects.requireNonNull(collector).resultSupplier();
158 BiFunction<R, ? super T, R> accumulator = collector.accumulator();
159 BinaryOperator<R> combiner = collector.combiner();
160 class ReducingSink extends Box<R>
161 implements AccumulatingSink<T, R, ReducingSink> {
162 @Override
163 public void begin(long size) {
164 state = supplier.get();
165 }
166
167 @Override
168 public void accept(T t) {
169 R newResult = accumulator.apply(state, t);
170 if (state != newResult)
171 state = newResult;
172 }
173
174 @Override
175 public void combine(ReducingSink other) {
176 state = combiner.apply(state, other.state);
177 }
178 }
179 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
180 @Override
181 public ReducingSink makeSink() {
182 return new ReducingSink();
183 }
184
185 @Override
186 public int getOpFlags() {
187 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
188 ? StreamOpFlag.NOT_ORDERED
189 : 0;
190 }
191 };
192 }
193
194 /**
195 * Constructs a {@code TerminalOp} that implements a mutable reduce on
196 * reference values.
197 *
198 * @param <T> the type of the input elements
199 * @param <R> the type of the result
|
131
132 @Override
133 public void combine(ReducingSink other) {
134 if (!other.empty)
135 accept(other.state);
136 }
137 }
138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139 @Override
140 public ReducingSink makeSink() {
141 return new ReducingSink();
142 }
143 };
144 }
145
146 /**
147 * Constructs a {@code TerminalOp} that implements a mutable reduce on
148 * reference values.
149 *
150 * @param <T> the type of the input elements
151 * @param <I> the type of the intermediate reduction result
152 * @param <I> the type of the final reduction result
153 * @param collector a {@code Collector} defining the reduction
154 * @return a {@code ReduceOp} implementing the reduction
155 */
156 public static <T, I, R> TerminalOp<T, I>
157 makeRef(Collector<? super T, I, R> collector) {
158 Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
159 BiConsumer<I, ? super T> accumulator = collector.accumulator();
160 BinaryOperator<I> combiner = collector.combiner();
161 class ReducingSink extends Box<I>
162 implements AccumulatingSink<T, I, ReducingSink> {
163 @Override
164 public void begin(long size) {
165 state = supplier.get();
166 }
167
168 @Override
169 public void accept(T t) {
170 accumulator.accept(state, t);
171 }
172
173 @Override
174 public void combine(ReducingSink other) {
175 state = combiner.apply(state, other.state);
176 }
177 }
178 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
179 @Override
180 public ReducingSink makeSink() {
181 return new ReducingSink();
182 }
183
184 @Override
185 public int getOpFlags() {
186 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
187 ? StreamOpFlag.NOT_ORDERED
188 : 0;
189 }
190 };
191 }
192
193 /**
194 * Constructs a {@code TerminalOp} that implements a mutable reduce on
195 * reference values.
196 *
197 * @param <T> the type of the input elements
198 * @param <R> the type of the result
|