1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.HashSet;
  28 import java.util.LinkedHashSet;
  29 import java.util.Objects;
  30 import java.util.Set;
  31 import java.util.Spliterator;
  32 import java.util.concurrent.ConcurrentHashMap;
  33 import java.util.concurrent.atomic.AtomicBoolean;
  34 import java.util.function.IntFunction;
  35 
  36 /**
  37  * Factory methods for transforming streams into duplicate-free streams, using
  38  * {@link Object#equals(Object)} to determine equality.
  39  *
  40  * @since 1.8
  41  */
  42 final class DistinctOps {
  43 
  44     private DistinctOps() { }
  45 
  46     /**
  47      * Appends a "distinct" operation to the provided stream, and returns the
  48      * new stream.
  49      *
  50      * @param <T> The type of both input and output elements
  51      * @param upstream A reference stream with element type T
  52      * @return the new stream
  53      */
  54     static<T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  55         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
  56                                                       StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
  57             @Override
  58             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
  59                                               Spliterator<P_IN> spliterator,
  60                                               IntFunction<T[]> generator) {
  61                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
  62                     // No-op
  63                     return helper.evaluate(spliterator, false, generator);
  64                 }
  65                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
  66                     // If the stream is SORTED then it should also be ORDERED so the following will also
  67                     // preserve the sort order
  68                     TerminalOp<T, LinkedHashSet<T>> reduceOp
  69                             = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
  70                                                                      LinkedHashSet::addAll);
  71                     return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
  72                 }
  73                 else {
  74                     // Holder of null state since ConcurrentHashMap does not support null values
  75                     AtomicBoolean seenNull = new AtomicBoolean(false);
  76                     ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
  77                     TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
  78                         if (t == null)
  79                             seenNull.set(true);
  80                         else
  81                             map.putIfAbsent(t, Boolean.TRUE);
  82                     }, false);
  83                     forEachOp.evaluateParallel(helper, spliterator);
  84 
  85                     // If null has been seen then copy the key set into a HashSet that supports null values
  86                     // and add null
  87                     Set<T> keys = map.keySet();
  88                     if (seenNull.get()) {
  89                         // TODO Implement a more efficient set-union view, rather than copying
  90                         keys = new HashSet<>(keys);
  91                         keys.add(null);
  92                     }
  93                     return Nodes.node(keys);
  94                 }
  95             }
  96 
  97             @Override
  98             Sink<T> opWrapSink(int flags, Sink<T> sink) {
  99                 Objects.requireNonNull(sink);
 100 
 101                 if (StreamOpFlag.DISTINCT.isKnown(flags)) {
 102                     return sink;
 103                 }
 104                 else if (StreamOpFlag.SORTED.isKnown(flags)) {
 105                     return new Sink.ChainedReference<T>(sink) {
 106                         boolean seenNull;
 107                         T lastSeen;
 108 
 109                         @Override
 110                         public void begin(long size) {
 111                             seenNull = false;
 112                             lastSeen = null;
 113                             downstream.begin(-1);
 114                         }
 115 
 116                         @Override
 117                         public void end() {
 118                             seenNull = false;
 119                             lastSeen = null;
 120                             downstream.end();
 121                         }
 122 
 123                         @Override
 124                         public void accept(T t) {
 125                             if (t == null) {
 126                                 if (!seenNull) {
 127                                     seenNull = true;
 128                                     downstream.accept(lastSeen = null);
 129                                 }
 130                             } else if (lastSeen == null || !t.equals(lastSeen)) {
 131                                 downstream.accept(lastSeen = t);
 132                             }
 133                         }
 134                     };
 135                 }
 136                 else {
 137                     return new Sink.ChainedReference<T>(sink) {
 138                         Set<T> seen;
 139 
 140                         @Override
 141                         public void begin(long size) {
 142                             seen = new HashSet<>();
 143                             downstream.begin(-1);
 144                         }
 145 
 146                         @Override
 147                         public void end() {
 148                             seen = null;
 149                             downstream.end();
 150                         }
 151 
 152                         @Override
 153                         public void accept(T t) {
 154                             if (!seen.contains(t)) {
 155                                 seen.add(t);
 156                                 downstream.accept(t);
 157                             }
 158                         }
 159                     };
 160                 }
 161             }
 162         };
 163     }
 164 }