# HG changeset patch # User henryjen # Date 1377722761 25200 # Node ID cc08040c99a904771d0df7a91a557b2368dc4bc0 # Parent 3575263eefab4fdf8e4047b774f8aa487f96d96f 8017513: Support for closeable streams 8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification 8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed Summary: BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream Reviewed-by: Contributed-by: brian.goetz@oracle.com diff --git a/src/share/classes/java/nio/file/Files.java b/src/share/classes/java/nio/file/Files.java --- a/src/share/classes/java/nio/file/Files.java +++ b/src/share/classes/java/nio/file/Files.java @@ -38,14 +38,52 @@ import java.io.Writer; import java.io.BufferedReader; import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.IOException; +import java.io.Reader; import java.io.UncheckedIOException; -import java.util.*; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.DosFileAttributes; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.FileAttributeView; +import java.nio.file.attribute.FileOwnerAttributeView; +import java.nio.file.attribute.FileStoreAttributeView; +import java.nio.file.attribute.FileTime; +import java.nio.file.attribute.PosixFileAttributeView; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.UserPrincipal; +import java.nio.file.spi.FileSystemProvider; +import java.nio.file.spi.FileTypeDetector; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.function.BiPredicate; -import java.util.stream.CloseableStream; -import java.util.stream.DelegatingStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; import java.security.AccessController; @@ -74,6 +112,21 @@ return path.getFileSystem().provider(); } + /** + * Convert a Closeable to a Runnable by converting checked IOException + * to UncheckedIOException + */ + private static Runnable asUncheckedRunnable(Closeable c) { + return () -> { + try { + c.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + // -- File contents -- /** @@ -3228,29 +3281,7 @@ // -- Stream APIs -- /** - * Implementation of CloseableStream - */ - private static class DelegatingCloseableStream extends DelegatingStream - implements CloseableStream - { - private final Closeable closeable; - - DelegatingCloseableStream(Closeable c, Stream delegate) { - super(delegate); - this.closeable = c; - } - - public void close() { - try { - closeable.close(); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - } - } - - /** - * Return a lazily populated {@code CloseableStream}, the elements of + * Return a lazily populated {@code Stream}, the elements of * which are the entries in the directory. The listing is not recursive. * *

The elements of the stream are {@link Path} objects that are @@ -3264,10 +3295,13 @@ * reflect updates to the directory that occur after returning from this * method. * - *

When not using the try-with-resources construct, then the stream's - * {@link CloseableStream#close close} method should be invoked after the - * operation is completed so as to free any resources held for the open - * directory. Operating on a closed stream behaves as if the end of stream + *

The returned stream encapsulates one or more {@link DirectoryStream}s. + * If timely disposal of file system resources is required, the + * {@code try}-with-resources construct should be used to ensure that the + * stream's {@link Stream#close close} method is invoked after the stream + * operations are completed. + * + *

Operating on a closed stream behaves as if the end of stream * has been reached. Due to read-ahead, one or more elements may be * returned after the stream has been closed. * @@ -3278,7 +3312,7 @@ * * @param dir The path to the directory * - * @return The {@code CloseableStream} describing the content of the + * @return The {@code Stream} describing the content of the * directory * * @throws NotDirectoryException @@ -3294,43 +3328,46 @@ * @see #newDirectoryStream(Path) * @since 1.8 */ - public static CloseableStream list(Path dir) throws IOException { + public static Stream list(Path dir) throws IOException { DirectoryStream ds = Files.newDirectoryStream(dir); - final Iterator delegate = ds.iterator(); + try { + final Iterator delegate = ds.iterator(); - // Re-wrap DirectoryIteratorException to UncheckedIOException - Iterator it = new Iterator() { - public boolean hasNext() { - try { - return delegate.hasNext(); - } catch (DirectoryIteratorException e) { - throw new UncheckedIOException(e.getCause()); + // Re-wrap DirectoryIteratorException to UncheckedIOException + Iterator it = new Iterator() { + public boolean hasNext() { + try { + return delegate.hasNext(); + } catch (DirectoryIteratorException e) { + throw new UncheckedIOException(e.getCause()); + } } - } - public Path next() { - try { - return delegate.next(); - } catch (DirectoryIteratorException e) { - throw new UncheckedIOException(e.getCause()); + public Path next() { + try { + return delegate.next(); + } catch (DirectoryIteratorException e) { + throw new UncheckedIOException(e.getCause()); + } } - } - }; + }; - Stream s = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), - false); - return new DelegatingCloseableStream<>(ds, s); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false) + .onClose(asUncheckedRunnable(ds)); + } catch (Error|RuntimeException e) { + ds.close(); + throw e; + } } /** - * Return a {@code CloseableStream} that is lazily populated with {@code + * Return a {@code Stream} that is lazily populated with {@code * Path} by walking the file tree rooted at a given starting file. The * file tree is traversed depth-first, the elements in the stream * are {@link Path} objects that are obtained as if by {@link * Path#resolve(Path) resolving} the relative path against {@code start}. * *

The {@code stream} walks the file tree as elements are consumed. - * The {@code CloseableStream} returned is guaranteed to have at least one + * The {@code Stream} returned is guaranteed to have at least one * element, the starting file itself. For each file visited, the stream * attempts to read its {@link BasicFileAttributes}. If the file is a * directory and can be opened successfully, entries in the directory, and @@ -3370,10 +3407,11 @@ *

When a security manager is installed and it denies access to a file * (or directory), then it is ignored and not included in the stream. * - *

When not using the try-with-resources construct, then the stream's - * {@link CloseableStream#close close} method should be invoked after the - * operation is completed so as to free any resources held for the open - * directory. Operate the stream after it is closed will throw an + *

The returned stream encapsulates one or more {@link DirectoryStream}s. + * If timely disposal of file system resources is required, the + * {@code try}-with-resources construct should be used to ensure that the + * stream's {@link Stream#close close} method is invoked after the stream + * operations are completed. Operating on a closed stream will result in an * {@link java.lang.IllegalStateException}. * *

If an {@link IOException} is thrown when accessing the directory @@ -3388,7 +3426,7 @@ * @param options * options to configure the traversal * - * @return the {@link CloseableStream} of {@link Path} + * @return the {@link Stream} of {@link Path} * * @throws IllegalArgumentException * if the {@code maxDepth} parameter is negative @@ -3401,21 +3439,22 @@ * if an I/O error is thrown when accessing the starting file. * @since 1.8 */ - public static CloseableStream walk(Path start, int maxDepth, - FileVisitOption... options) - throws IOException - { + public static Stream walk(Path start, int maxDepth, + FileVisitOption... options) + throws IOException { FileTreeIterator iterator = new FileTreeIterator(start, maxDepth, options); - - Stream s = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), - false). - map(entry -> entry.file()); - return new DelegatingCloseableStream<>(iterator, s); + try { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), false) + .onClose(iterator::close) + .map(entry -> entry.file()); + } catch (Error|RuntimeException e) { + iterator.close(); + throw e; + } } /** - * Return a {@code CloseableStream} that is lazily populated with {@code + * Return a {@code Stream} that is lazily populated with {@code * Path} by walking the file tree rooted at a given starting file. The * file tree is traversed depth-first, the elements in the stream * are {@link Path} objects that are obtained as if by {@link @@ -3428,12 +3467,19 @@ * * In other words, it visits all levels of the file tree. * + *

The returned stream encapsulates one or more {@link DirectoryStream}s. + * If timely disposal of file system resources is required, the + * {@code try}-with-resources construct should be used to ensure that the + * stream's {@link Stream#close close} method is invoked after the stream + * operations are completed. Operating on a closed stream will result in an + * {@link java.lang.IllegalStateException}. + * * @param start * the starting file * @param options * options to configure the traversal * - * @return the {@link CloseableStream} of {@link Path} + * @return the {@link Stream} of {@link Path} * * @throws SecurityException * If the security manager denies access to the starting file. @@ -3446,15 +3492,14 @@ * @see #walk(Path, int, FileVisitOption...) * @since 1.8 */ - public static CloseableStream walk(Path start, - FileVisitOption... options) - throws IOException - { + public static Stream walk(Path start, + FileVisitOption... options) + throws IOException { return walk(start, Integer.MAX_VALUE, options); } /** - * Return a {@code CloseableStream} that is lazily populated with {@code + * Return a {@code Stream} that is lazily populated with {@code * Path} by searching for files in a file tree rooted at a given starting * file. * @@ -3463,12 +3508,19 @@ * {@link BiPredicate} is invoked with its {@link Path} and {@link * BasicFileAttributes}. The {@code Path} object is obtained as if by * {@link Path#resolve(Path) resolving} the relative path against {@code - * start} and is only included in the returned {@link CloseableStream} if + * start} and is only included in the returned {@link Stream} if * the {@code BiPredicate} returns true. Compare to calling {@link * java.util.stream.Stream#filter filter} on the {@code Stream} * returned by {@code walk} method, this method may be more efficient by * avoiding redundant retrieval of the {@code BasicFileAttributes}. * + *

The returned stream encapsulates one or more {@link DirectoryStream}s. + * If timely disposal of file system resources is required, the + * {@code try}-with-resources construct should be used to ensure that the + * stream's {@link Stream#close close} method is invoked after the stream + * operations are completed. Operating on a closed stream will result in an + * {@link java.lang.IllegalStateException}. + * *

If an {@link IOException} is thrown when accessing the directory * after returned from this method, it is wrapped in an {@link * UncheckedIOException} which will be thrown from the method that caused @@ -3484,7 +3536,7 @@ * @param options * options to configure the traversal * - * @return the {@link CloseableStream} of {@link Path} + * @return the {@link Stream} of {@link Path} * * @throws IllegalArgumentException * if the {@code maxDepth} parameter is negative @@ -3499,24 +3551,25 @@ * @see #walk(Path, int, FileVisitOption...) * @since 1.8 */ - public static CloseableStream find(Path start, - int maxDepth, - BiPredicate matcher, - FileVisitOption... options) - throws IOException - { + public static Stream find(Path start, + int maxDepth, + BiPredicate matcher, + FileVisitOption... options) + throws IOException { FileTreeIterator iterator = new FileTreeIterator(start, maxDepth, options); - - Stream s = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), - false). - filter(entry -> matcher.test(entry.file(), entry.attributes())). - map(entry -> entry.file()); - return new DelegatingCloseableStream<>(iterator, s); + try { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), false) + .onClose(iterator::close) + .filter(entry -> matcher.test(entry.file(), entry.attributes())) + .map(entry -> entry.file()); + } catch (Error|RuntimeException e) { + iterator.close(); + throw e; + } } /** - * Read all lines from a file as a {@code CloseableStream}. Unlike {@link + * Read all lines from a file as a {@code Stream}. Unlike {@link * #readAllLines(Path, Charset) readAllLines}, this method does not read * all lines into a {@code List}, but instead populates lazily as the stream * is consumed. @@ -3528,22 +3581,24 @@ *

After this method returns, then any subsequent I/O exception that * occurs while reading from the file or when a malformed or unmappable byte * sequence is read, is wrapped in an {@link UncheckedIOException} that will - * be thrown form the + * be thrown from the * {@link java.util.stream.Stream} method that caused the read to take * place. In case an {@code IOException} is thrown when closing the file, * it is also wrapped as an {@code UncheckedIOException}. * - *

When not using the try-with-resources construct, then stream's - * {@link CloseableStream#close close} method should be invoked after - * operation is completed so as to free any resources held for the open - * file. + *

The returned stream encapsulates a {@link Reader}. If timely + * disposal of file system resources is required, the try-with-resources + * construct should be used to ensure that the stream's + * {@link Stream#close close} method is invoked after the stream operations + * are completed. + * * * @param path * the path to the file * @param cs * the charset to use for decoding * - * @return the lines from the file as a {@code CloseableStream} + * @return the lines from the file as a {@code Stream} * * @throws IOException * if an I/O error occurs opening the file @@ -3557,10 +3612,13 @@ * @see java.io.BufferedReader#lines() * @since 1.8 */ - public static CloseableStream lines(Path path, Charset cs) - throws IOException - { + public static Stream lines(Path path, Charset cs) throws IOException { BufferedReader br = Files.newBufferedReader(path, cs); - return new DelegatingCloseableStream<>(br, br.lines()); + try { + return br.lines().onClose(asUncheckedRunnable(br)); + } catch (Error|RuntimeException e) { + br.close(); + throw e; + } } } diff --git a/src/share/classes/java/util/stream/AbstractPipeline.java b/src/share/classes/java/util/stream/AbstractPipeline.java --- a/src/share/classes/java/util/stream/AbstractPipeline.java +++ b/src/share/classes/java/util/stream/AbstractPipeline.java @@ -71,6 +71,9 @@ */ abstract class AbstractPipeline> extends PipelineHelper implements BaseStream { + private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; + private static final String MSG_CONSUMED = "source already consumed or closed"; + /** * Backlink to the head of the pipeline chain (self if this is the source * stage). @@ -137,6 +140,8 @@ */ private boolean sourceAnyStateful; + private Runnable sourceCloseAction; + /** * True if pipeline is parallel, otherwise the pipeline is sequential; only * valid for the source stage. @@ -195,7 +200,7 @@ */ AbstractPipeline(AbstractPipeline previousStage, int opFlags) { if (previousStage.linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; @@ -221,7 +226,7 @@ final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() @@ -238,7 +243,7 @@ @SuppressWarnings("unchecked") final Node evaluateToArrayNode(IntFunction generator) { if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; // If the last intermediate operation is stateful then @@ -266,7 +271,7 @@ throw new IllegalStateException(); if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { @@ -282,7 +287,7 @@ return s; } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } } @@ -302,12 +307,35 @@ return (S) this; } + @Override + public void close() { + linkedOrConsumed = true; + sourceSupplier = null; + sourceSpliterator = null; + if (sourceStage.sourceCloseAction != null) { + Runnable closeAction = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = null; + closeAction.run(); + } + } + + @Override + @SuppressWarnings("unchecked") + public S onClose(Runnable closeHandler) { + Runnable existingHandler = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = + (existingHandler == null) + ? closeHandler + : Streams.composeWithExceptions(existingHandler, closeHandler); + return (S) this; + } + // Primitive specialization use co-variant overrides, hence is not final @Override @SuppressWarnings("unchecked") public Spliterator spliterator() { if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (this == sourceStage) { @@ -324,7 +352,7 @@ return lazySpliterator(s); } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } } else { @@ -424,7 +452,7 @@ sourceStage.sourceSupplier = null; } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } if (isParallel()) { diff --git a/src/share/classes/java/util/stream/BaseStream.java b/src/share/classes/java/util/stream/BaseStream.java --- a/src/share/classes/java/util/stream/BaseStream.java +++ b/src/share/classes/java/util/stream/BaseStream.java @@ -35,7 +35,8 @@ * @param type of stream implementing {@code BaseStream} * @since 1.8 */ -public interface BaseStream> { +public interface BaseStream> + extends AutoCloseable { /** * Returns an iterator for the elements of this stream. * @@ -103,4 +104,33 @@ * @return an unordered stream */ S unordered(); + + /** + * Returns an equivalent stream with an additional close handler. Close + * handlers are run when the {@link #close()} method + * is called on the stream, and are executed in the order they were + * added. All close handlers are run, even if earlier close handlers throw + * exceptions. If any close handler throws an exception, the first + * exception thrown will be relayed to the caller of {@code close()}, with + * any remaining exceptions added to that exception as suppressed exceptions + * (unless one of the remaining exceptions is the same exception as the + * first exception, since an exception cannot suppress itself.) May + * return itself. + * + *

This is an intermediate + * operation. + * + * @param closeHandler A task to execute when the stream is closed + * @return a stream with a handler that is run if the stream is closed + */ + S onClose(Runnable closeHandler); + + /** + * Closes this stream, causing all close handlers for this stream pipeline + * to be called. + * + * @see AutoCloseable#close() + */ + @Override + void close(); } diff --git a/src/share/classes/java/util/stream/CloseableStream.java b/src/share/classes/java/util/stream/CloseableStream.java deleted file mode 100644 --- a/src/share/classes/java/util/stream/CloseableStream.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package java.util.stream; - -/** - * A {@code CloseableStream} is a {@code Stream} that can be closed. - * The close method is invoked to release resources that the object is - * holding (such as open files). - * - * @param The type of stream elements - * @since 1.8 - */ -public interface CloseableStream extends Stream, AutoCloseable { - - /** - * Closes this resource, relinquishing any underlying resources. - * This method is invoked automatically on objects managed by the - * {@code try}-with-resources statement. Does nothing if called when - * the resource has already been closed. - * - * This method does not allow throwing checked {@code Exception}s like - * {@link AutoCloseable#close() AutoCloseable.close()}. Cases where the - * close operation may fail require careful attention by implementers. It - * is strongly advised to relinquish the underlying resources and to - * internally mark the resource as closed. The {@code close} - * method is unlikely to be invoked more than once and so this ensures - * that the resources are released in a timely manner. Furthermore it - * reduces problems that could arise when the resource wraps, or is - * wrapped, by another resource. - * - * @see AutoCloseable#close() - */ - void close(); -} diff --git a/src/share/classes/java/util/stream/DelegatingStream.java b/src/share/classes/java/util/stream/DelegatingStream.java deleted file mode 100644 --- a/src/share/classes/java/util/stream/DelegatingStream.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package java.util.stream; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.Objects; -import java.util.Optional; -import java.util.Spliterator; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.function.ToDoubleFunction; -import java.util.function.ToIntFunction; -import java.util.function.ToLongFunction; - -/** - * A {@code Stream} implementation that delegates operations to another {@code - * Stream}. - * - * @param type of stream elements for this stream and underlying delegate - * stream - * - * @since 1.8 - */ -public class DelegatingStream implements Stream { - final private Stream delegate; - - /** - * Construct a {@code Stream} that delegates operations to another {@code - * Stream}. - * - * @param delegate the underlying {@link Stream} to which we delegate all - * {@code Stream} methods - * @throws NullPointerException if the delegate is null - */ - public DelegatingStream(Stream delegate) { - this.delegate = Objects.requireNonNull(delegate); - } - - // -- BaseStream methods -- - - @Override - public Spliterator spliterator() { - return delegate.spliterator(); - } - - @Override - public boolean isParallel() { - return delegate.isParallel(); - } - - @Override - public Iterator iterator() { - return delegate.iterator(); - } - - // -- Stream methods -- - - @Override - public Stream filter(Predicate predicate) { - return delegate.filter(predicate); - } - - @Override - public Stream map(Function mapper) { - return delegate.map(mapper); - } - - @Override - public IntStream mapToInt(ToIntFunction mapper) { - return delegate.mapToInt(mapper); - } - - @Override - public LongStream mapToLong(ToLongFunction mapper) { - return delegate.mapToLong(mapper); - } - - @Override - public DoubleStream mapToDouble(ToDoubleFunction mapper) { - return delegate.mapToDouble(mapper); - } - - @Override - public Stream flatMap(Function> mapper) { - return delegate.flatMap(mapper); - } - - @Override - public IntStream flatMapToInt(Function mapper) { - return delegate.flatMapToInt(mapper); - } - - @Override - public LongStream flatMapToLong(Function mapper) { - return delegate.flatMapToLong(mapper); - } - - @Override - public DoubleStream flatMapToDouble(Function mapper) { - return delegate.flatMapToDouble(mapper); - } - - @Override - public Stream distinct() { - return delegate.distinct(); - } - - @Override - public Stream sorted() { - return delegate.sorted(); - } - - @Override - public Stream sorted(Comparator comparator) { - return delegate.sorted(comparator); - } - - @Override - public void forEach(Consumer action) { - delegate.forEach(action); - } - - @Override - public void forEachOrdered(Consumer action) { - delegate.forEachOrdered(action); - } - - @Override - public Stream peek(Consumer consumer) { - return delegate.peek(consumer); - } - - @Override - public Stream limit(long maxSize) { - return delegate.limit(maxSize); - } - - @Override - public Stream substream(long startingOffset) { - return delegate.substream(startingOffset); - } - - @Override - public Stream substream(long startingOffset, long endingOffset) { - return delegate.substream(startingOffset, endingOffset); - } - - @Override - public A[] toArray(IntFunction generator) { - return delegate.toArray(generator); - } - - @Override - public Object[] toArray() { - return delegate.toArray(); - } - - @Override - public T reduce(T identity, BinaryOperator accumulator) { - return delegate.reduce(identity, accumulator); - } - - @Override - public Optional reduce(BinaryOperator accumulator) { - return delegate.reduce(accumulator); - } - - @Override - public U reduce(U identity, BiFunction accumulator, - BinaryOperator combiner) { - return delegate.reduce(identity, accumulator, combiner); - } - - @Override - public R collect(Supplier resultFactory, - BiConsumer accumulator, - BiConsumer combiner) { - return delegate.collect(resultFactory, accumulator, combiner); - } - - @Override - public R collect(Collector collector) { - return delegate.collect(collector); - } - - @Override - public Optional max(Comparator comparator) { - return delegate.max(comparator); - } - - @Override - public Optional min(Comparator comparator) { - return delegate.min(comparator); - } - - @Override - public long count() { - return delegate.count(); - } - - @Override - public boolean anyMatch(Predicate predicate) { - return delegate.anyMatch(predicate); - } - - @Override - public boolean allMatch(Predicate predicate) { - return delegate.allMatch(predicate); - } - - @Override - public boolean noneMatch(Predicate predicate) { - return delegate.noneMatch(predicate); - } - - @Override - public Optional findFirst() { - return delegate.findFirst(); - } - - @Override - public Optional findAny() { - return delegate.findAny(); - } - - @Override - public Stream unordered() { - return delegate.unordered(); - } - - @Override - public Stream sequential() { - return delegate.sequential(); - } - - @Override - public Stream parallel() { - return delegate.parallel(); - } -} diff --git a/src/share/classes/java/util/stream/DoublePipeline.java b/src/share/classes/java/util/stream/DoublePipeline.java --- a/src/share/classes/java/util/stream/DoublePipeline.java +++ b/src/share/classes/java/util/stream/DoublePipeline.java @@ -266,10 +266,11 @@ @Override public void accept(double t) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - DoubleStream result = mapper.apply(t); - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + try (DoubleStream result = mapper.apply(t)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(i -> downstream.accept(i)); + } } }; } diff --git a/src/share/classes/java/util/stream/DoubleStream.java b/src/share/classes/java/util/stream/DoubleStream.java --- a/src/share/classes/java/util/stream/DoubleStream.java +++ b/src/share/classes/java/util/stream/DoubleStream.java @@ -752,7 +752,8 @@ * elements of a first {@code DoubleStream} succeeded by all the elements of the * second {@code DoubleStream}. The resulting stream is ordered if both * of the input streams are ordered, and parallel if either of the input - * streams is parallel. + * streams is parallel. When the resulting stream is closed, the close + * handlers for both input streams is invoked. * * @param a the first stream * @param b the second stream to concatenate on to end of the first stream @@ -764,7 +765,8 @@ Spliterator.OfDouble split = new Streams.ConcatSpliterator.OfDouble( a.spliterator(), b.spliterator()); - return StreamSupport.doubleStream(split, a.isParallel() || b.isParallel()); + DoubleStream stream = StreamSupport.doubleStream(split, a.isParallel() || b.isParallel()); + return stream.onClose(Streams.composedClose(a, b)); } /** diff --git a/src/share/classes/java/util/stream/IntPipeline.java b/src/share/classes/java/util/stream/IntPipeline.java --- a/src/share/classes/java/util/stream/IntPipeline.java +++ b/src/share/classes/java/util/stream/IntPipeline.java @@ -302,10 +302,11 @@ @Override public void accept(int t) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - IntStream result = mapper.apply(t); - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + try (IntStream result = mapper.apply(t)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(i -> downstream.accept(i)); + } } }; } diff --git a/src/share/classes/java/util/stream/IntStream.java b/src/share/classes/java/util/stream/IntStream.java --- a/src/share/classes/java/util/stream/IntStream.java +++ b/src/share/classes/java/util/stream/IntStream.java @@ -806,7 +806,8 @@ * elements of a first {@code IntStream} succeeded by all the elements of the * second {@code IntStream}. The resulting stream is ordered if both * of the input streams are ordered, and parallel if either of the input - * streams is parallel. + * streams is parallel. When the resulting stream is closed, the close + * handlers for both input streams is invoked. * * @param a the first stream * @param b the second stream to concatenate on to end of the first stream @@ -818,7 +819,8 @@ Spliterator.OfInt split = new Streams.ConcatSpliterator.OfInt( a.spliterator(), b.spliterator()); - return StreamSupport.intStream(split, a.isParallel() || b.isParallel()); + IntStream stream = StreamSupport.intStream(split, a.isParallel() || b.isParallel()); + return stream.onClose(Streams.composedClose(a, b)); } /** diff --git a/src/share/classes/java/util/stream/LongPipeline.java b/src/share/classes/java/util/stream/LongPipeline.java --- a/src/share/classes/java/util/stream/LongPipeline.java +++ b/src/share/classes/java/util/stream/LongPipeline.java @@ -283,10 +283,11 @@ @Override public void accept(long t) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - LongStream result = mapper.apply(t); - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + try (LongStream result = mapper.apply(t)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(i -> downstream.accept(i)); + } } }; } diff --git a/src/share/classes/java/util/stream/LongStream.java b/src/share/classes/java/util/stream/LongStream.java --- a/src/share/classes/java/util/stream/LongStream.java +++ b/src/share/classes/java/util/stream/LongStream.java @@ -812,7 +812,8 @@ * elements of a first {@code LongStream} succeeded by all the elements of the * second {@code LongStream}. The resulting stream is ordered if both * of the input streams are ordered, and parallel if either of the input - * streams is parallel. + * streams is parallel. When the resulting stream is closed, the close + * handlers for both input streams is invoked. * * @param a the first stream * @param b the second stream to concatenate on to end of the first stream @@ -824,7 +825,8 @@ Spliterator.OfLong split = new Streams.ConcatSpliterator.OfLong( a.spliterator(), b.spliterator()); - return StreamSupport.longStream(split, a.isParallel() || b.isParallel()); + LongStream stream = StreamSupport.longStream(split, a.isParallel() || b.isParallel()); + return stream.onClose(Streams.composedClose(a, b)); } /** diff --git a/src/share/classes/java/util/stream/ReferencePipeline.java b/src/share/classes/java/util/stream/ReferencePipeline.java --- a/src/share/classes/java/util/stream/ReferencePipeline.java +++ b/src/share/classes/java/util/stream/ReferencePipeline.java @@ -264,10 +264,11 @@ @Override public void accept(P_OUT u) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - Stream result = mapper.apply(u); - if (result != null) - result.sequential().forEach(downstream); + try (Stream result = mapper.apply(u)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(downstream); + } } }; } @@ -291,10 +292,11 @@ @Override public void accept(P_OUT u) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - IntStream result = mapper.apply(u); - if (result != null) - result.sequential().forEach(downstreamAsInt); + try (IntStream result = mapper.apply(u)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(downstreamAsInt); + } } }; } @@ -318,10 +320,11 @@ @Override public void accept(P_OUT u) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - DoubleStream result = mapper.apply(u); - if (result != null) - result.sequential().forEach(downstreamAsDouble); + try (DoubleStream result = mapper.apply(u)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(downstreamAsDouble); + } } }; } @@ -345,10 +348,11 @@ @Override public void accept(P_OUT u) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - LongStream result = mapper.apply(u); - if (result != null) - result.sequential().forEach(downstreamAsLong); + try (LongStream result = mapper.apply(u)) { + // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it + if (result != null) + result.sequential().forEach(downstreamAsLong); + } } }; } diff --git a/src/share/classes/java/util/stream/Stream.java b/src/share/classes/java/util/stream/Stream.java --- a/src/share/classes/java/util/stream/Stream.java +++ b/src/share/classes/java/util/stream/Stream.java @@ -891,7 +891,8 @@ * elements of a first {@code Stream} succeeded by all the elements of the * second {@code Stream}. The resulting stream is ordered if both * of the input streams are ordered, and parallel if either of the input - * streams is parallel. + * streams is parallel. When the resulting stream is closed, the close + * handlers for both input streams is invoked. * * @param The type of stream elements * @param a the first stream @@ -906,7 +907,8 @@ @SuppressWarnings("unchecked") Spliterator split = new Streams.ConcatSpliterator.OfRef<>( (Spliterator) a.spliterator(), (Spliterator) b.spliterator()); - return StreamSupport.stream(split, a.isParallel() || b.isParallel()); + Stream stream = StreamSupport.stream(split, a.isParallel() || b.isParallel()); + return stream.onClose(Streams.composedClose(a, b)); } /** diff --git a/src/share/classes/java/util/stream/Streams.java b/src/share/classes/java/util/stream/Streams.java --- a/src/share/classes/java/util/stream/Streams.java +++ b/src/share/classes/java/util/stream/Streams.java @@ -833,4 +833,63 @@ } } } + + /** + * Given two Runnables, return a Runnable that executes both in sequence, + * even if the first throws an exception, and if both throw exceptions, add + * any exceptions thrown by the second as suppressed exceptions of the first. + */ + static Runnable composeWithExceptions(Runnable a, Runnable b) { + return new Runnable() { + @Override + @SuppressWarnings("finally") + public void run() { + try { + a.run(); + } + catch (Throwable e1) { + try { + b.run(); + } + catch (Throwable e2) { + e1.addSuppressed(e2); + } + finally { // addSuppressed can throw exception + throw e1; + } + } + b.run(); + } + }; + } + + /** + * Given two streams, return a Runnable that + * executes both of their {@link BaseStream#close} methods in sequence, + * even if the first throws an exception, and if both throw exceptions, add + * any exceptions thrown by the second as suppressed exceptions of the first. + */ + static Runnable composedClose(BaseStream a, BaseStream b) { + return new Runnable() { + @Override + @SuppressWarnings("finally") + public void run() { + try { + a.close(); + } + catch (Throwable e1) { + try { + b.close(); + } + catch (Throwable e2) { + e1.addSuppressed(e2); + } + finally { // addSuppressed can throw exception + throw e1; + } + } + b.close(); + } + }; + } } diff --git a/test/java/nio/file/Files/StreamTest.java b/test/java/nio/file/Files/StreamTest.java --- a/test/java/nio/file/Files/StreamTest.java +++ b/test/java/nio/file/Files/StreamTest.java @@ -43,14 +43,13 @@ import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.function.BiPredicate; -import java.util.stream.CloseableStream; +import java.util.stream.Stream; import java.util.stream.Collectors; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -138,14 +137,14 @@ } public void testBasic() { - try (CloseableStream s = Files.list(testFolder)) { - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); + try (Stream s = Files.list(testFolder)) { + Object[] actual = s.sorted().toArray(); assertEquals(actual, level1); } catch (IOException ioe) { fail("Unexpected IOException"); } - try (CloseableStream s = Files.list(testFolder.resolve("empty"))) { + try (Stream s = Files.list(testFolder.resolve("empty"))) { int count = s.mapToInt(p -> 1).reduce(0, Integer::sum); assertEquals(count, 0, "Expect empty stream."); } catch (IOException ioe) { @@ -154,8 +153,8 @@ } public void testWalk() { - try (CloseableStream s = Files.walk(testFolder)) { - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); + try (Stream s = Files.walk(testFolder)) { + Object[] actual = s.sorted().toArray(); assertEquals(actual, all); } catch (IOException ioe) { fail("Unexpected IOException"); @@ -163,9 +162,9 @@ } public void testWalkOneLevel() { - try (CloseableStream s = Files.walk(testFolder, 1)) { + try (Stream s = Files.walk(testFolder, 1)) { Object[] actual = s.filter(path -> ! path.equals(testFolder)) - .sorted(Comparator.naturalOrder()) + .sorted() .toArray(); assertEquals(actual, level1); } catch (IOException ioe) { @@ -176,8 +175,8 @@ public void testWalkFollowLink() { // If link is not supported, the directory structure won't have link. // We still want to test the behavior with FOLLOW_LINKS option. - try (CloseableStream s = Files.walk(testFolder, FileVisitOption.FOLLOW_LINKS)) { - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); + try (Stream s = Files.walk(testFolder, FileVisitOption.FOLLOW_LINKS)) { + Object[] actual = s.sorted().toArray(); assertEquals(actual, all_folowLinks); } catch (IOException ioe) { fail("Unexpected IOException"); @@ -185,7 +184,7 @@ } private void validateFileSystemLoopException(Path start, Path... causes) { - try (CloseableStream s = Files.walk(start, FileVisitOption.FOLLOW_LINKS)) { + try (Stream s = Files.walk(start, FileVisitOption.FOLLOW_LINKS)) { try { int count = s.mapToInt(p -> 1).reduce(0, Integer::sum); fail("Should got FileSystemLoopException, but got " + count + "elements."); @@ -282,28 +281,28 @@ public void testFind() throws IOException { PathBiPredicate pred = new PathBiPredicate((path, attrs) -> true); - try (CloseableStream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { + try (Stream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { Set result = s.collect(Collectors.toCollection(TreeSet::new)); assertEquals(pred.visited(), all); assertEquals(result.toArray(new Path[0]), pred.visited()); } pred = new PathBiPredicate((path, attrs) -> attrs.isSymbolicLink()); - try (CloseableStream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { + try (Stream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { s.forEach(path -> assertTrue(Files.isSymbolicLink(path))); assertEquals(pred.visited(), all); } pred = new PathBiPredicate((path, attrs) -> path.getFileName().toString().startsWith("e")); - try (CloseableStream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { + try (Stream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { s.forEach(path -> assertEquals(path.getFileName().toString(), "empty")); assertEquals(pred.visited(), all); } pred = new PathBiPredicate((path, attrs) -> path.getFileName().toString().startsWith("l") && attrs.isRegularFile()); - try (CloseableStream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { + try (Stream s = Files.find(testFolder, Integer.MAX_VALUE, pred)) { s.forEach(path -> fail("Expect empty stream")); assertEquals(pred.visited(), all); } @@ -317,14 +316,14 @@ try { // zero lines assertTrue(Files.size(tmpfile) == 0, "File should be empty"); - try (CloseableStream s = Files.lines(tmpfile, US_ASCII)) { + try (Stream s = Files.lines(tmpfile, US_ASCII)) { assertEquals(s.mapToInt(l -> 1).reduce(0, Integer::sum), 0, "No line expected"); } // one line byte[] hi = { (byte)'h', (byte)'i' }; Files.write(tmpfile, hi); - try (CloseableStream s = Files.lines(tmpfile, US_ASCII)) { + try (Stream s = Files.lines(tmpfile, US_ASCII)) { List lines = s.collect(Collectors.toList()); assertTrue(lines.size() == 1, "One line expected"); assertTrue(lines.get(0).equals("hi"), "'Hi' expected"); @@ -334,7 +333,7 @@ List expected = Arrays.asList("hi", "there"); Files.write(tmpfile, expected, US_ASCII); assertTrue(Files.size(tmpfile) > 0, "File is empty"); - try (CloseableStream s = Files.lines(tmpfile, US_ASCII)) { + try (Stream s = Files.lines(tmpfile, US_ASCII)) { List lines = s.collect(Collectors.toList()); assertTrue(lines.equals(expected), "Unexpected lines"); } @@ -342,7 +341,7 @@ // MalformedInputException byte[] bad = { (byte)0xff, (byte)0xff }; Files.write(tmpfile, bad); - try (CloseableStream s = Files.lines(tmpfile, US_ASCII)) { + try (Stream s = Files.lines(tmpfile, US_ASCII)) { try { List lines = s.collect(Collectors.toList()); throw new RuntimeException("UncheckedIOException expected"); @@ -378,7 +377,7 @@ fsp.setFaultyMode(false); Path fakeRoot = fs.getRoot(); try { - try (CloseableStream s = Files.list(fakeRoot)) { + try (Stream s = Files.list(fakeRoot)) { s.forEach(path -> assertEquals(path.getFileName().toString(), "DirectoryIteratorException")); } } catch (UncheckedIOException uioe) { @@ -398,7 +397,7 @@ } try { - try (CloseableStream s = Files.list(fakeRoot)) { + try (Stream s = Files.list(fakeRoot)) { s.forEach(path -> fail("should not get here")); } } catch (UncheckedIOException uioe) { @@ -427,12 +426,12 @@ try { fsp.setFaultyMode(false); Path fakeRoot = fs.getRoot(); - try (CloseableStream s = Files.list(fakeRoot.resolve("dir2"))) { + try (Stream s = Files.list(fakeRoot.resolve("dir2"))) { // only one file s.forEach(path -> assertEquals(path.getFileName().toString(), "IOException")); } - try (CloseableStream s = Files.walk(fakeRoot.resolve("empty"))) { + try (Stream s = Files.walk(fakeRoot.resolve("empty"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); // ordered as depth-first @@ -440,13 +439,13 @@ } fsp.setFaultyMode(true); - try (CloseableStream s = Files.list(fakeRoot.resolve("dir2"))) { + try (Stream s = Files.list(fakeRoot.resolve("dir2"))) { s.forEach(path -> fail("should have caused exception")); } catch (UncheckedIOException uioe) { assertTrue(uioe.getCause() instanceof FaultyFileSystem.FaultyException); } - try (CloseableStream s = Files.walk(fakeRoot.resolve("empty"))) { + try (Stream s = Files.walk(fakeRoot.resolve("empty"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); fail("should not reach here due to IOException"); @@ -454,7 +453,7 @@ assertTrue(uioe.getCause() instanceof FaultyFileSystem.FaultyException); } - try (CloseableStream s = Files.walk( + try (Stream s = Files.walk( fakeRoot.resolve("empty").resolve("IOException"))) { String[] result = s.map(path -> path.getFileName().toString()) @@ -502,20 +501,20 @@ fsp.setFaultyMode(false); Path fakeRoot = fs.getRoot(); // validate setting - try (CloseableStream s = Files.list(fakeRoot.resolve("empty"))) { + try (Stream s = Files.list(fakeRoot.resolve("empty"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "SecurityException", "sample" }); } - try (CloseableStream s = Files.walk(fakeRoot.resolve("dir2"))) { + try (Stream s = Files.walk(fakeRoot.resolve("dir2"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "dir2", "SecurityException", "fileInSE", "file" }); } if (supportsLinks) { - try (CloseableStream s = Files.list(fakeRoot.resolve("dir"))) { + try (Stream s = Files.list(fakeRoot.resolve("dir"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "d1", "f1", "lnDir2", "SecurityException", "lnDirSE", "lnFileSE" }); @@ -525,13 +524,13 @@ // execute test fsp.setFaultyMode(true); // ignore file cause SecurityException - try (CloseableStream s = Files.walk(fakeRoot.resolve("empty"))) { + try (Stream s = Files.walk(fakeRoot.resolve("empty"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "empty", "sample" }); } // skip folder cause SecurityException - try (CloseableStream s = Files.walk(fakeRoot.resolve("dir2"))) { + try (Stream s = Files.walk(fakeRoot.resolve("dir2"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "dir2", "file" }); @@ -539,14 +538,14 @@ if (supportsLinks) { // not following links - try (CloseableStream s = Files.walk(fakeRoot.resolve("dir"))) { + try (Stream s = Files.walk(fakeRoot.resolve("dir"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "dir", "d1", "f1", "lnDir2", "lnDirSE", "lnFileSE" }); } // following links - try (CloseableStream s = Files.walk(fakeRoot.resolve("dir"), FileVisitOption.FOLLOW_LINKS)) { + try (Stream s = Files.walk(fakeRoot.resolve("dir"), FileVisitOption.FOLLOW_LINKS)) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); // ?? Should fileInSE show up? @@ -556,19 +555,19 @@ } // list instead of walk - try (CloseableStream s = Files.list(fakeRoot.resolve("empty"))) { + try (Stream s = Files.list(fakeRoot.resolve("empty"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "sample" }); } - try (CloseableStream s = Files.list(fakeRoot.resolve("dir2"))) { + try (Stream s = Files.list(fakeRoot.resolve("dir2"))) { String[] result = s.map(path -> path.getFileName().toString()) .toArray(String[]::new); assertEqualsNoOrder(result, new String[] { "file" }); } // root cause SecurityException should be reported - try (CloseableStream s = Files.walk( + try (Stream s = Files.walk( fakeRoot.resolve("dir2").resolve("SecurityException"))) { String[] result = s.map(path -> path.getFileName().toString()) @@ -579,7 +578,7 @@ } // Walk a file cause SecurityException, we should get SE - try (CloseableStream s = Files.walk( + try (Stream s = Files.walk( fakeRoot.resolve("dir").resolve("SecurityException"))) { String[] result = s.map(path -> path.getFileName().toString()) @@ -590,7 +589,7 @@ } // List a file cause SecurityException, we should get SE as cannot read attribute - try (CloseableStream s = Files.list( + try (Stream s = Files.list( fakeRoot.resolve("dir2").resolve("SecurityException"))) { String[] result = s.map(path -> path.getFileName().toString()) @@ -600,7 +599,7 @@ assertTrue(se.getCause() instanceof FaultyFileSystem.FaultyException); } - try (CloseableStream s = Files.list( + try (Stream s = Files.list( fakeRoot.resolve("dir").resolve("SecurityException"))) { String[] result = s.map(path -> path.getFileName().toString()) @@ -627,7 +626,7 @@ } public void testConstructException() { - try (CloseableStream s = Files.lines(testFolder.resolve("notExist"), Charset.forName("UTF-8"))) { + try (Stream s = Files.lines(testFolder.resolve("notExist"), Charset.forName("UTF-8"))) { s.forEach(l -> fail("File is not even exist!")); } catch (IOException ioe) { assertTrue(ioe instanceof NoSuchFileException); @@ -635,24 +634,26 @@ } public void testClosedStream() throws IOException { - try (CloseableStream s = Files.list(testFolder)) { + try (Stream s = Files.list(testFolder)) { s.close(); - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); - assertTrue(actual.length <= level1.length); - } - - try (CloseableStream s = Files.walk(testFolder)) { - s.close(); - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); + Object[] actual = s.sorted().toArray(); fail("Operate on closed stream should throw IllegalStateException"); } catch (IllegalStateException ex) { // expected } - try (CloseableStream s = Files.find(testFolder, Integer.MAX_VALUE, + try (Stream s = Files.walk(testFolder)) { + s.close(); + Object[] actual = s.sorted().toArray(); + fail("Operate on closed stream should throw IllegalStateException"); + } catch (IllegalStateException ex) { + // expected + } + + try (Stream s = Files.find(testFolder, Integer.MAX_VALUE, (p, attr) -> true)) { s.close(); - Object[] actual = s.sorted(Comparator.naturalOrder()).toArray(); + Object[] actual = s.sorted().toArray(); fail("Operate on closed stream should throw IllegalStateException"); } catch (IllegalStateException ex) { // expected diff --git a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java --- a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java @@ -40,7 +40,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario { - STREAM_FOR_EACH(false) { + STREAM_FOR_EACH_WITH_CLOSE(false) { > void _run(TestData data, DoubleConsumer b, Function m) { DoubleStream s = m.apply(data.stream()); @@ -48,6 +48,7 @@ s = s.sequential(); } s.forEach(b); + s.close(); } }, diff --git a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java --- a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java @@ -40,7 +40,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario { - STREAM_FOR_EACH(false) { + STREAM_FOR_EACH_WITH_CLOSE(false) { > void _run(TestData data, IntConsumer b, Function m) { IntStream s = m.apply(data.stream()); @@ -48,6 +48,7 @@ s = s.sequential(); } s.forEach(b); + s.close(); } }, diff --git a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java --- a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java @@ -40,7 +40,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario { - STREAM_FOR_EACH(false) { + STREAM_FOR_EACH_WITH_CLOSE(false) { > void _run(TestData data, LongConsumer b, Function m) { LongStream s = m.apply(data.stream()); @@ -48,6 +48,7 @@ s = s.sequential(); } s.forEach(b); + s.close(); } }, diff --git a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java --- a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java @@ -39,7 +39,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { - STREAM_FOR_EACH(false) { + STREAM_FOR_EACH_WITH_CLOSE(false) { > void _run(TestData data, Consumer b, Function> m) { Stream s = m.apply(data.stream()); @@ -47,6 +47,7 @@ s = s.sequential(); } s.forEach(b); + s.close(); } }, diff --git a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/StreamCloseTest.java b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/StreamCloseTest.java new file mode 100644 --- /dev/null +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/StreamCloseTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.tests.java.util.stream; + +import java.util.Arrays; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; + +import org.testng.annotations.Test; + +import static java.util.stream.LambdaTestHelpers.countTo; + +/** + * StreamCloseTest + * + * @author Brian Goetz + */ +@Test(groups = { "serialization-hostile" }) +public class StreamCloseTest extends OpTestCase { + public void testEmptyCloseHandler() { + try (Stream ints = countTo(100).stream()) { + ints.forEach(i -> {}); + } + } + + public void testOneCloseHandler() { + final boolean[] holder = new boolean[1]; + Runnable closer = () -> { holder[0] = true; }; + + try (Stream ints = countTo(100).stream()) { + ints.onClose(closer); + ints.forEach(i -> {}); + } + assertTrue(holder[0]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().onClose(closer)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(closer)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(closer).filter(e -> true)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0]); + } + + public void testTwoCloseHandlers() { + final boolean[] holder = new boolean[2]; + Runnable close1 = () -> { holder[0] = true; }; + Runnable close2 = () -> { holder[1] = true; }; + + try (Stream ints = countTo(100).stream()) { + ints.onClose(close1).onClose(close2); + ints.forEach(i -> {}); + } + assertTrue(holder[0] && holder[1]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().onClose(close1).onClose(close2)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0] && holder[1]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0] && holder[1]); + + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).filter(e -> true)) { + ints.forEach(i -> {}); + } + assertTrue(holder[0] && holder[1]); + } + + public void testCascadedExceptions() { + final boolean[] holder = new boolean[3]; + boolean caught = false; + Runnable close1 = () -> { holder[0] = true; throw new RuntimeException("1"); }; + Runnable close2 = () -> { holder[1] = true; throw new RuntimeException("2"); }; + Runnable close3 = () -> { holder[2] = true; throw new RuntimeException("3"); }; + + try (Stream ints = countTo(100).stream()) { + ints.onClose(close1).onClose(close2).onClose(close3); + ints.forEach(i -> {}); + } + catch (RuntimeException e) { + assertCascaded(e, 3); + assertTrue(holder[0] && holder[1] && holder[2]); + caught = true; + } + assertTrue(caught); + + Arrays.fill(holder, false); + caught = false; + try (Stream ints = countTo(100).stream().onClose(close1).onClose(close2).onClose(close3)) { + ints.forEach(i -> {}); + } + catch (RuntimeException e) { + assertCascaded(e, 3); + assertTrue(holder[0] && holder[1] && holder[2]); + caught = true; + } + assertTrue(caught); + + caught = false; + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).onClose(close3)) { + ints.forEach(i -> {}); + } + catch (RuntimeException e) { + assertCascaded(e, 3); + assertTrue(holder[0] && holder[1] && holder[2]); + caught = true; + } + assertTrue(caught); + + caught = false; + Arrays.fill(holder, false); + try (Stream ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).filter(e -> true).onClose(close3)) { + ints.forEach(i -> {}); + } + catch (RuntimeException e) { + assertCascaded(e, 3); + assertTrue(holder[0] && holder[1] && holder[2]); + caught = true; + } + assertTrue(caught); + } + + private void assertCascaded(RuntimeException e, int n) { + assertTrue(e.getMessage().equals("1")); + assertTrue(e.getSuppressed().length == n - 1); + for (int i=0; i