src/share/classes/java/util/stream/ReferencePipeline.java

Print this page
rev 7923 : 8023681: Fix raw type warning caused by Sink
Reviewed-by:

@@ -161,21 +161,20 @@
         Objects.requireNonNull(predicate);
         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
                     }
 
                     @Override
-                    @SuppressWarnings("unchecked")
                     public void accept(P_OUT u) {
                         if (predicate.test(u))
-                            downstream.accept((Object) u);
+                            downstream.accept(u);
                     }
                 };
             }
         };
     }

@@ -186,11 +185,11 @@
         Objects.requireNonNull(mapper);
         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, R>(sink) {
                     @Override
                     public void accept(P_OUT u) {
                         downstream.accept(mapper.apply(u));
                     }
                 };

@@ -203,11 +202,11 @@
         Objects.requireNonNull(mapper);
         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
                     @Override
                     public void accept(P_OUT u) {
                         downstream.accept(mapper.applyAsInt(u));
                     }
                 };

@@ -220,11 +219,11 @@
         Objects.requireNonNull(mapper);
         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Long>(sink) {
                     @Override
                     public void accept(P_OUT u) {
                         downstream.accept(mapper.applyAsLong(u));
                     }
                 };

@@ -237,11 +236,11 @@
         Objects.requireNonNull(mapper);
         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Double>(sink) {
                     @Override
                     public void accept(P_OUT u) {
                         downstream.accept(mapper.applyAsDouble(u));
                     }
                 };

@@ -255,18 +254,17 @@
         // We can do better than this, by polling cancellationRequested when stream is infinite
         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, R>(sink) {
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
                     }
 
                     @Override
-                    @SuppressWarnings("unchecked")
                     public void accept(P_OUT u) {
                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                         Stream<? extends R> result = mapper.apply(u);
                         if (result != null)
                             result.sequential().forEach(downstream);

@@ -282,11 +280,11 @@
         // We can do better than this, by polling cancellationRequested when stream is infinite
         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
                     IntConsumer downstreamAsInt = downstream::accept;
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
                     }

@@ -309,11 +307,11 @@
         // We can do better than this, by polling cancellationRequested when stream is infinite
         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Double>(sink) {
                     DoubleConsumer downstreamAsDouble = downstream::accept;
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
                     }

@@ -336,11 +334,11 @@
         // We can do better than this, by polling cancellationRequested when stream is infinite
         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, Long>(sink) {
                     LongConsumer downstreamAsLong = downstream::accept;
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
                     }

@@ -362,13 +360,12 @@
         Objects.requireNonNull(tee);
         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                      0) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
-                return new Sink.ChainedReference<P_OUT>(sink) {
+                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                     @Override
-                    @SuppressWarnings("unchecked")
                     public void accept(P_OUT u) {
                         tee.accept(u);
                         downstream.accept(u);
                     }
                 };

@@ -493,10 +490,11 @@
     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public final <R, A> R collect(Collector<? super P_OUT, A, ? extends R> collector) {
         A container;
         if (isParallel()
                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {