1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.Supplier; 30 31 /** 32 * Utility methods useful in the implementation of stream operations ({@link IntermediateOp}, {@link StatefulOp}, 33 * {@link TerminalOp}). 34 * @since 1.8 35 */ 36 class OpUtils { 37 private OpUtils() { 38 throw new IllegalStateException("no instances"); 39 } 40 41 /** 42 * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, 43 * compute the contents of the stream, in parallel, and pass the elements to the provided {@code Sink}. Elements 44 * will be passed to the {@code Sink} in whatever thread and whatever order they become available, independent 45 * of the stream's encounter order. 46 * 47 * @param helper A {@code PipelineHelper} describing the stream source and operations 48 * @param sink A {@code Sink} into which to deposit resulting elements 49 * @param <P_IN> The input type of the stream pipeline 50 * @param <P_OUT> The output type of the stream pipeline 51 */ 52 public static<P_IN, P_OUT> void parallelForEach(PipelineHelper<P_IN, P_OUT> helper, 53 Sink<P_IN> sink) { 54 new ForEachTask<>(helper, sink).invoke(); 55 } 56 57 /** 58 * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, 59 * compute the contents of the stream, sequentially, and collect the results into a {@code Node}. The order 60 * of output elements will respect the encounter order of the source stream, and all computation will happen 61 * in the invoking thread. 62 * 63 * @param op An {@code IntermediateOp} representing the final operation to be implemented, typically 64 * a {@code StatefulOp} 65 * @param helper A {@code PipelineHelper} describing the stream source and operations 66 * @param <P_IN> The input type of the stream pipeline 67 * @param <P_OUT> The output type of the stream pipeline 68 * @return A {@code Node} containing the output of the stream pipeline 69 */ 70 static<P_IN, P_OUT> Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op, 71 PipelineHelper<P_IN, P_OUT> helper) { 72 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags()) 73 ? helper.exactOutputSizeIfKnown(helper.sourceSpliterator()) 74 : -1; 75 final Node.Builder<P_OUT> nb = helper.makeNodeBuilder(sizeIfKnown); 76 Sink<P_OUT> opSink = op.wrapSink(helper.getStreamAndOpFlags(), nb); 77 78 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.getOpFlags())) 79 helper.into(opSink, helper.sourceSpliterator()); 80 else 81 helper.intoWrappedWithCancel(helper.wrapSink(opSink), helper.sourceSpliterator()); 82 return nb.build(); 83 } 84 85 /** 86 * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, 87 * perform a reduction operation on the stream, in parallel, using the supplied {@link AccumulatingSink} type. 88 * The {@code AccumulatingSink} must represent an associative reducing operation. 89 * 90 * @param helper A {@code PipelineHelper} describing the stream source and operations 91 * @param factory A {@code Supplier} for an {@code AccumulatingSink} which performs the associative reducing 92 * operation 93 * @param <P_IN> The input type of the stream pipeline 94 * @param <P_OUT> The output type of the stream pipeline 95 * @param <R> The result type of the reduction operation 96 * @param <S> The type of the {@code AccumulatingSink} 97 * @return The result of the reduction operation 98 */ 99 public static<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> 100 R parallelReduce(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> factory) { 101 S sink = new ReduceTask<>(helper, factory).invoke(); 102 return sink.getAndClearState(); 103 } 104 105 /** 106 * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type 107 * {@code T} and producing a result of type {@code R}. 108 * 109 * @param <T> The type of input element to the combining operation 110 * @param <R> The result type 111 * @param <K> The type of the {@code AccumulatingSink} 112 */ 113 public interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> { 114 public void combine(K other); 115 public void clearState(); 116 } 117 118 119 /** A {@code ForkJoinTask} for performing a parallel for-each operation */ 120 private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> { 121 // @@@ Extending AbstractTask here is potentially inefficient, since we don't really need to 122 // keep track of the structure of the computation tree 123 private final Sink<S> sink; 124 125 private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) { 126 super(helper); 127 this.sink = sink; 128 } 129 130 private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) { 131 super(parent, spliterator); 132 this.sink = sink; 133 } 134 135 @Override 136 public boolean suggestSplit() { 137 boolean suggest = super.suggestSplit(); 138 if (StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags())) 139 suggest = suggest && !sink.cancellationRequested(); 140 return suggest; 141 } 142 143 @Override 144 protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) { 145 return new ForEachTask<>(this, spliterator, sink); 146 } 147 148 @Override 149 protected Void doLeaf() { 150 helper.intoWrapped(sink, spliterator); 151 return null; 152 } 153 } 154 155 /** A {@code ForkJoinTask} for performing a parallel reduce operation */ 156 private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> 157 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 158 private final Supplier<S> sinkFactory; 159 160 private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) { 161 super(helper); 162 this.sinkFactory = sinkFactory; 163 } 164 165 private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) { 166 super(parent, spliterator); 167 this.sinkFactory = parent.sinkFactory; 168 } 169 170 @Override 171 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 172 return new ReduceTask<>(this, spliterator); 173 } 174 175 @Override 176 protected S doLeaf() { 177 return helper.into(sinkFactory.get(), spliterator); 178 } 179 180 @Override 181 public void onCompletion(CountedCompleter caller) { 182 if (!isLeaf()) { 183 ReduceTask<P_IN, P_OUT, R, S> child = children; 184 S result = child.getLocalResult(); 185 child = child.nextSibling; 186 for (; child != null; child = child.nextSibling) { 187 S otherResult = child.getLocalResult(); 188 result.combine(otherResult); 189 otherResult.clearState(); 190 } 191 setLocalResult(result); 192 } 193 } 194 } 195 }