1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.HashSet;
  28 import java.util.LinkedHashSet;
  29 import java.util.Objects;
  30 import java.util.Set;
  31 import java.util.Spliterator;
  32 import java.util.concurrent.ConcurrentHashMap;
  33 import java.util.concurrent.atomic.AtomicBoolean;
  34 import java.util.function.IntFunction;
  35 
  36 /**
  37  * Factory methods for transforming streams into duplicate-free streams, using
  38  * {@link Object#equals(Object)} to determine equality.
  39  *
  40  * @since 1.8
  41  */
  42 final class DistinctOps {
  43 
  44     private DistinctOps() { }
  45 
  46     /**
  47      * Appends a "distinct" operation to the provided stream, and returns the
  48      * new stream.
  49      *
  50      * @param <T> the type of both input and output elements
  51      * @param upstream a reference stream with element type T
  52      * @return the new stream
  53      */
  54     static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  55         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
  56                                                       StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
  57 
  58             <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
  59                 // If the stream is SORTED then it should also be ORDERED so the following will also
  60                 // preserve the sort order
  61                 TerminalOp<T, LinkedHashSet<T>> reduceOp
  62                         = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
  63                                                                  LinkedHashSet::addAll);
  64                 return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
  65             }
  66 
  67             @Override
  68             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
  69                                               Spliterator<P_IN> spliterator,
  70                                               IntFunction<T[]> generator) {
  71                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
  72                     // No-op
  73                     return helper.evaluate(spliterator, false, generator);
  74                 }
  75                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
  76                     return reduce(helper, spliterator);
  77                 }
  78                 else {
  79                     // Holder of null state since ConcurrentHashMap does not support null values
  80                     AtomicBoolean seenNull = new AtomicBoolean(false);
  81                     ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
  82                     TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
  83                         if (t == null)
  84                             seenNull.set(true);
  85                         else
  86                             map.putIfAbsent(t, Boolean.TRUE);
  87                     }, false);
  88                     forEachOp.evaluateParallel(helper, spliterator);
  89 
  90                     // If null has been seen then copy the key set into a HashSet that supports null values
  91                     // and add null
  92                     Set<T> keys = map.keySet();
  93                     if (seenNull.get()) {
  94                         // TODO Implement a more efficient set-union view, rather than copying
  95                         keys = new HashSet<>(keys);
  96                         keys.add(null);
  97                     }
  98                     return Nodes.node(keys);
  99                 }
 100             }
 101 
 102             @Override
 103             <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 104                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
 105                     // No-op
 106                     return helper.wrapSpliterator(spliterator);
 107                 }
 108                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 109                     // Not lazy, barrier required to preserve order
 110                     return reduce(helper, spliterator).spliterator();
 111                 }
 112                 else {
 113                     // Lazy
 114                     return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
 115                 }
 116             }
 117 
 118             @Override
 119             Sink<T> opWrapSink(int flags, Sink<T> sink) {
 120                 Objects.requireNonNull(sink);
 121 
 122                 if (StreamOpFlag.DISTINCT.isKnown(flags)) {
 123                     return sink;
 124                 } else if (StreamOpFlag.SORTED.isKnown(flags)) {
 125                     return new Sink.ChainedReference<T, T>(sink) {
 126                         boolean seenNull;
 127                         T lastSeen;
 128 
 129                         @Override
 130                         public void begin(long size) {
 131                             seenNull = false;
 132                             lastSeen = null;
 133                             downstream.begin(-1);
 134                         }
 135 
 136                         @Override
 137                         public void end() {
 138                             seenNull = false;
 139                             lastSeen = null;
 140                             downstream.end();
 141                         }
 142 
 143                         @Override
 144                         public void accept(T t) {
 145                             if (t == null) {
 146                                 if (!seenNull) {
 147                                     seenNull = true;
 148                                     downstream.accept(lastSeen = null);
 149                                 }
 150                             } else if (lastSeen == null || !t.equals(lastSeen)) {
 151                                 downstream.accept(lastSeen = t);
 152                             }
 153                         }
 154                     };
 155                 } else {
 156                     return new Sink.ChainedReference<T, T>(sink) {
 157                         Set<T> seen;
 158 
 159                         @Override
 160                         public void begin(long size) {
 161                             seen = new HashSet<>();
 162                             downstream.begin(-1);
 163                         }
 164 
 165                         @Override
 166                         public void end() {
 167                             seen = null;
 168                             downstream.end();
 169                         }
 170 
 171                         @Override
 172                         public void accept(T t) {
 173                             if (!seen.contains(t)) {
 174                                 seen.add(t);
 175                                 downstream.accept(t);
 176                             }
 177                         }
 178                     };
 179                 }
 180             }
 181         };
 182     }
 183 }