1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.HashSet; 28 import java.util.LinkedHashSet; 29 import java.util.Objects; 30 import java.util.Set; 31 import java.util.Spliterator; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.function.IntFunction; 35 36 /** 37 * Factory methods for transforming streams into duplicate-free streams, using 38 * {@link Object#equals(Object)} to determine equality. 39 * 40 * @since 1.8 41 */ 42 final class DistinctOps { 43 44 private DistinctOps() { } 45 46 /** 47 * Appends a "distinct" operation to the provided stream, and returns the 48 * new stream. 49 * 50 * @param <T> The type of both input and output elements 51 * @param upstream A reference stream with element type T 52 * @return the new stream 53 */ 54 static<T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { 55 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, 56 StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { 57 @Override 58 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 59 Spliterator<P_IN> spliterator, 60 IntFunction<T[]> generator) { 61 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { 62 // No-op 63 return helper.evaluate(spliterator, false, generator); 64 } 65 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 66 // If the stream is SORTED then it should also be ORDERED so the following will also 67 // preserve the sort order 68 TerminalOp<T, LinkedHashSet<T>> reduceOp 69 = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, 70 LinkedHashSet::addAll); 71 return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); 72 } 73 else { 74 // Holder of null state since ConcurrentHashMap does not support null values 75 AtomicBoolean seenNull = new AtomicBoolean(false); 76 ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>(); 77 TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> { 78 if (t == null) 79 seenNull.set(true); 80 else 81 map.putIfAbsent(t, Boolean.TRUE); 82 }, false); 83 forEachOp.evaluateParallel(helper, spliterator); 84 85 // If null has been seen then copy the key set into a HashSet that supports null values 86 // and add null 87 Set<T> keys = map.keySet(); 88 if (seenNull.get()) { 89 // TODO Implement a more efficient set-union view, rather than copying 90 keys = new HashSet<>(keys); 91 keys.add(null); 92 } 93 return Nodes.node(keys); 94 } 95 } 96 97 @Override 98 Sink<T> opWrapSink(int flags, Sink<T> sink) { 99 Objects.requireNonNull(sink); 100 101 if (StreamOpFlag.DISTINCT.isKnown(flags)) { 102 return sink; 103 } 104 else if (StreamOpFlag.SORTED.isKnown(flags)) { 105 return new Sink.ChainedReference<T>(sink) { 106 boolean seenNull; 107 T lastSeen; 108 109 @Override 110 public void begin(long size) { 111 seenNull = false; 112 lastSeen = null; 113 downstream.begin(-1); 114 } 115 116 @Override 117 public void end() { 118 seenNull = false; 119 lastSeen = null; 120 downstream.end(); 121 } 122 123 @Override 124 public void accept(T t) { 125 if (t == null) { 126 if (!seenNull) { 127 seenNull = true; 128 downstream.accept(lastSeen = null); 129 } 130 } else if (lastSeen == null || !t.equals(lastSeen)) { 131 downstream.accept(lastSeen = t); 132 } 133 } 134 }; 135 } 136 else { 137 return new Sink.ChainedReference<T>(sink) { 138 Set<T> seen; 139 140 @Override 141 public void begin(long size) { 142 seen = new HashSet<>(); 143 downstream.begin(-1); 144 } 145 146 @Override 147 public void end() { 148 seen = null; 149 downstream.end(); 150 } 151 152 @Override 153 public void accept(T t) { 154 if (!seen.contains(t)) { 155 seen.add(t); 156 downstream.accept(t); 157 } 158 } 159 }; 160 } 161 } 162 }; 163 } 164 }