< prev index next >
src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java
Print this page
rev 54573 : 8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel
Reviewed-by: TBD
*** 33,44 ****
--- 33,46 ----
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+ import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+ import java.nio.channels.NonWritableChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.*;
import java.nio.file.attribute.FileAttribute;
*** 597,611 ****
}
if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING))
throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
}
-
// Returns an output SeekableByteChannel for either
// (1) writing the contents of a new entry, if the entry doesn't exit, or
// (2) updating/replacing the contents of an existing entry.
! // Note: The content is not compressed.
private class EntryOutputChannel extends ByteArrayChannel {
Entry e;
EntryOutputChannel(Entry e) throws IOException {
super(e.size > 0? (int)e.size : 8192, false);
--- 599,613 ----
}
if (options.contains(APPEND) && options.contains(TRUNCATE_EXISTING))
throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
}
// Returns an output SeekableByteChannel for either
// (1) writing the contents of a new entry, if the entry doesn't exit, or
// (2) updating/replacing the contents of an existing entry.
! // Note: The content of the channel is not compressed until the
! // channel is closed
private class EntryOutputChannel extends ByteArrayChannel {
Entry e;
EntryOutputChannel(Entry e) throws IOException {
super(e.size > 0? (int)e.size : 8192, false);
*** 620,642 ****
e.flag |= FLAG_USE_UTF8;
}
@Override
public void close() throws IOException {
! e.bytes = toByteArray();
! e.size = e.bytes.length;
! e.crc = -1;
super.close();
- update(e);
}
}
private int getCompressMethod(FileAttribute<?>... attrs) {
return defaultMethod;
}
! // Returns a Writable/ReadByteChannel for now. Might consdier to use
// newFileChannel() instead, which dump the entry data into a regular
// file on the default file system and create a FileChannel on top of
// it.
SeekableByteChannel newByteChannel(byte[] path,
Set<? extends OpenOption> options,
--- 622,643 ----
e.flag |= FLAG_USE_UTF8;
}
@Override
public void close() throws IOException {
! OutputStream os = getOutputStream(e);
! os.write(toByteArray());
! os.close(); // will update the entry
super.close();
}
}
private int getCompressMethod(FileAttribute<?>... attrs) {
return defaultMethod;
}
! // Returns a Writable/ReadByteChannel for now. Might consider to use
// newFileChannel() instead, which dump the entry data into a regular
// file on the default file system and create a FileChannel on top of
// it.
SeekableByteChannel newByteChannel(byte[] path,
Set<? extends OpenOption> options,
*** 645,658 ****
{
checkOptions(options);
if (options.contains(StandardOpenOption.WRITE) ||
options.contains(StandardOpenOption.APPEND)) {
checkWritable();
! beginRead(); // only need a readlock, the "update()" will obtain
! // thewritelock when the channel is closed
try {
- ensureOpen();
Entry e = getEntry(path);
if (e != null) {
if (e.isDir() || options.contains(CREATE_NEW))
throw new FileAlreadyExistsException(getString(path));
SeekableByteChannel sbc =
--- 646,658 ----
{
checkOptions(options);
if (options.contains(StandardOpenOption.WRITE) ||
options.contains(StandardOpenOption.APPEND)) {
checkWritable();
! beginRead(); // only need a read lock, the "update()" will obtain
! // the write lock when the channel is closed
try {
Entry e = getEntry(path);
if (e != null) {
if (e.isDir() || options.contains(CREATE_NEW))
throw new FileAlreadyExistsException(getString(path));
SeekableByteChannel sbc =
*** 674,684 ****
if (!options.contains(CREATE) && !options.contains(CREATE_NEW))
throw new NoSuchFileException(getString(path));
checkParents(path);
return new EntryOutputChannel(
new Entry(path, Entry.NEW, false, getCompressMethod(attrs)));
-
} finally {
endRead();
}
} else {
beginRead();
--- 674,683 ----
*** 741,751 ****
.provider()
.newFileChannel(tmpfile, options, attrs);
final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
if (forWrite) {
u.flag = FLAG_DATADESCR;
! u.method = getCompressMethod(attrs);
}
// is there a better way to hook into the FileChannel's close method?
return new FileChannel() {
public int write(ByteBuffer src) throws IOException {
return fch.write(src);
--- 740,750 ----
.provider()
.newFileChannel(tmpfile, options, attrs);
final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
if (forWrite) {
u.flag = FLAG_DATADESCR;
! u.method = getCompressMethod();
}
// is there a better way to hook into the FileChannel's close method?
return new FileChannel() {
public int write(ByteBuffer src) throws IOException {
return fch.write(src);
*** 842,852 ****
}
}
// the outstanding input streams that need to be closed
private Set<InputStream> streams =
! Collections.synchronizedSet(new HashSet<InputStream>());
private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
private Path getTempPathForEntry(byte[] path) throws IOException {
Path tmpPath = createTempFileInSameDirectoryAs(zfpath);
if (path != null) {
--- 841,855 ----
}
}
// the outstanding input streams that need to be closed
private Set<InputStream> streams =
! Collections.synchronizedSet(new HashSet<>());
!
! // the ex-channel and ex-path that need to close when their outstanding
! // input streams are all closed by the obtainers.
! private Set<ExChannelCloser> exChClosers = new HashSet<>();
private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
private Path getTempPathForEntry(byte[] path) throws IOException {
Path tmpPath = createTempFileInSameDirectoryAs(zfpath);
if (path != null) {
*** 1200,1249 ****
locoff += n;
}
return written;
}
! private long writeEntry(Entry e, OutputStream os, byte[] buf)
throws IOException {
if (e.bytes == null && e.file == null) // dir, 0-length data
return 0;
long written = 0;
! try (OutputStream os2 = e.method == METHOD_STORED ?
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
! if (e.bytes != null) { // in-memory
! os2.write(e.bytes, 0, e.bytes.length);
! } else if (e.file != null) { // tmp file
! if (e.type == Entry.NEW || e.type == Entry.FILECH) {
! try (InputStream is = Files.newInputStream(e.file)) {
! is.transferTo(os2);
! }
! }
! Files.delete(e.file);
! tmppaths.remove(e.file);
}
}
written += e.csize;
if ((e.flag & FLAG_DATADESCR) != 0) {
written += e.writeEXT(os);
}
return written;
}
// sync the zip file system, if there is any udpate
private void sync() throws IOException {
!
if (!hasUpdate)
return;
Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
! try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE)))
! {
ArrayList<Entry> elist = new ArrayList<>(inodes.size());
long written = 0;
! byte[] buf = new byte[8192];
! Entry e = null;
// write loc
for (IndexNode inode : inodes.values()) {
if (inode instanceof Entry) { // an updated inode
e = (Entry)inode;
--- 1203,1269 ----
locoff += n;
}
return written;
}
! private long writeEntry(Entry e, OutputStream os)
throws IOException {
if (e.bytes == null && e.file == null) // dir, 0-length data
return 0;
long written = 0;
! if (e.crc != 0 && e.csize > 0) {
! // pre-compressed entry, write directly to output stream
! writeTo(e, os);
! } else {
! try (OutputStream os2 = (e.method == METHOD_STORED) ?
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
! writeTo(e, os2);
}
}
written += e.csize;
if ((e.flag & FLAG_DATADESCR) != 0) {
written += e.writeEXT(os);
}
return written;
}
+ private void writeTo(Entry e, OutputStream os) throws IOException {
+ if (e.bytes != null) {
+ os.write(e.bytes, 0, e.bytes.length);
+ } else if (e.file != null) {
+ if (e.type == Entry.NEW || e.type == Entry.FILECH) {
+ try (InputStream is = Files.newInputStream(e.file)) {
+ is.transferTo(os);
+ }
+ }
+ Files.delete(e.file);
+ tmppaths.remove(e.file);
+ }
+ }
+
// sync the zip file system, if there is any udpate
private void sync() throws IOException {
! // check ex-closer
! if (!exChClosers.isEmpty()) {
! for (ExChannelCloser ecc : exChClosers) {
! if (ecc.streams.isEmpty()) {
! ecc.ch.close();
! Files.delete(ecc.path);
! exChClosers.remove(ecc);
! }
! }
! }
if (!hasUpdate)
return;
Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
! try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) {
ArrayList<Entry> elist = new ArrayList<>(inodes.size());
long written = 0;
! byte[] buf = null;
! Entry e;
// write loc
for (IndexNode inode : inodes.values()) {
if (inode instanceof Entry) { // an updated inode
e = (Entry)inode;
*** 1252,1266 ****
// entry copy: the only thing changed is the "name"
// and "nlen" in LOC header, so we udpate/rewrite the
// LOC in new file and simply copy the rest (data and
// ext) without enflating/deflating from the old zip
// file LOC entry.
written += copyLOCEntry(e, true, os, written, buf);
} else { // NEW, FILECH or CEN
e.locoff = written;
written += e.writeLOC(os); // write loc header
! written += writeEntry(e, os, buf);
}
elist.add(e);
} catch (IOException x) {
x.printStackTrace(); // skip any in-accurate entry
}
--- 1272,1288 ----
// entry copy: the only thing changed is the "name"
// and "nlen" in LOC header, so we udpate/rewrite the
// LOC in new file and simply copy the rest (data and
// ext) without enflating/deflating from the old zip
// file LOC entry.
+ if (buf == null)
+ buf = new byte[8192];
written += copyLOCEntry(e, true, os, written, buf);
} else { // NEW, FILECH or CEN
e.locoff = written;
written += e.writeLOC(os); // write loc header
! written += writeEntry(e, os);
}
elist.add(e);
} catch (IOException x) {
x.printStackTrace(); // skip any in-accurate entry
}
*** 1272,1281 ****
--- 1294,1305 ----
continue; // no root '/' directory even it
// exits in original zip/jar file.
}
e = Entry.readCEN(this, inode);
try {
+ if (buf == null)
+ buf = new byte[8192];
written += copyLOCEntry(e, false, os, written, buf);
elist.add(e);
} catch (IOException x) {
x.printStackTrace(); // skip any wrong entry
}
*** 1289,1301 ****
}
end.centot = elist.size();
end.cenlen = written - end.cenoff;
end.write(os, written, forceEnd64);
}
!
ch.close();
Files.delete(zfpath);
Files.move(tmpFile, zfpath, REPLACE_EXISTING);
hasUpdate = false; // clear
}
IndexNode getInode(byte[] path) {
--- 1313,1339 ----
}
end.centot = elist.size();
end.cenlen = written - end.cenoff;
end.write(os, written, forceEnd64);
}
! if (!streams.isEmpty()) {
! //
! // There are outstanding input streams open on existing "ch",
! // so, don't close the "cha" and delete the "file for now, let
! // the "ex-channel-closer" to handle them
! ExChannelCloser ecc = new ExChannelCloser(
! createTempFileInSameDirectoryAs(zfpath),
! ch,
! streams);
! Files.move(zfpath, ecc.path, REPLACE_EXISTING);
! exChClosers.add(ecc);
! streams = Collections.synchronizedSet(new HashSet<InputStream>());
! } else {
ch.close();
Files.delete(zfpath);
+ }
+
Files.move(tmpFile, zfpath, REPLACE_EXISTING);
hasUpdate = false; // clear
}
IndexNode getInode(byte[] path) {
*** 1349,1363 ****
e.file = getTempPathForEntry(null);
os = Files.newOutputStream(e.file, WRITE);
} else {
os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
}
return new EntryOutputStream(e, os);
}
private class EntryOutputStream extends FilterOutputStream {
! private Entry e;
private long written;
private boolean isClosed;
EntryOutputStream(Entry e, OutputStream os) throws IOException {
super(os);
--- 1387,1405 ----
e.file = getTempPathForEntry(null);
os = Files.newOutputStream(e.file, WRITE);
} else {
os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
}
+ if (e.method == METHOD_DEFLATED) {
+ return new DeflatingEntryOutputStream(e, os);
+ } else {
return new EntryOutputStream(e, os);
}
+ }
private class EntryOutputStream extends FilterOutputStream {
! private final Entry e;
private long written;
private boolean isClosed;
EntryOutputStream(Entry e, OutputStream os) throws IOException {
super(os);
*** 1390,1406 ****
super.close();
update(e);
}
}
// Wrapper output stream class to write out a "stored" entry.
// (1) this class does not close the underlying out stream when
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamCRC32 extends FilterOutputStream {
! private Entry e;
! private CRC32 crc;
private long written;
private boolean isClosed;
EntryOutputStreamCRC32(Entry e, OutputStream os) throws IOException {
super(os);
--- 1432,1491 ----
super.close();
update(e);
}
}
+ // Output stream returned when writing "deflated" entries into memory,
+ // to enable eager (possibly parallel) deflation and reduce memory required.
+ private class DeflatingEntryOutputStream extends DeflaterOutputStream {
+ private final CRC32 crc;
+ private final Entry e;
+ private boolean isClosed;
+
+ DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException {
+ super(os, getDeflater());
+ this.e = Objects.requireNonNull(e, "Zip entry is null");
+ this.crc = new CRC32();
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ super.write(b);
+ crc.update(b);
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len)
+ throws IOException {
+ super.write(b, off, len);
+ crc.update(b, off, len);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (isClosed)
+ return;
+ isClosed = true;
+ finish();
+ e.size = def.getBytesRead();
+ e.csize = def.getBytesWritten();
+ e.crc = crc.getValue();
+ if (out instanceof ByteArrayOutputStream)
+ e.bytes = ((ByteArrayOutputStream)out).toByteArray();
+ super.close();
+ update(e);
+ releaseDeflater(def);
+ }
+ }
+
// Wrapper output stream class to write out a "stored" entry.
// (1) this class does not close the underlying out stream when
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamCRC32 extends FilterOutputStream {
! private final CRC32 crc;
! private final Entry e;
private long written;
private boolean isClosed;
EntryOutputStreamCRC32(Entry e, OutputStream os) throws IOException {
super(os);
*** 1436,1447 ****
// Wrapper output stream class to write out a "deflated" entry.
// (1) this class does not close the underlying out stream when
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamDef extends DeflaterOutputStream {
! private CRC32 crc;
! private Entry e;
private boolean isClosed;
EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
super(os, getDeflater());
this.e = Objects.requireNonNull(e, "Zip entry is null");
--- 1521,1532 ----
// Wrapper output stream class to write out a "deflated" entry.
// (1) this class does not close the underlying out stream when
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamDef extends DeflaterOutputStream {
! private final CRC32 crc;
! private final Entry e;
private boolean isClosed;
EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
super(os, getDeflater());
this.e = Objects.requireNonNull(e, "Zip entry is null");
*** 1469,1486 ****
}
private InputStream getInputStream(Entry e)
throws IOException
{
! InputStream eis = null;
!
if (e.type == Entry.NEW) {
- // now bytes & file is uncompressed.
if (e.bytes != null)
! return new ByteArrayInputStream(e.bytes);
else if (e.file != null)
! return Files.newInputStream(e.file);
else
throw new ZipException("update entry data is missing");
} else if (e.type == Entry.FILECH) {
// FILECH result is un-compressed.
eis = Files.newInputStream(e.file);
--- 1554,1569 ----
}
private InputStream getInputStream(Entry e)
throws IOException
{
! InputStream eis;
if (e.type == Entry.NEW) {
if (e.bytes != null)
! eis = new ByteArrayInputStream(e.bytes);
else if (e.file != null)
! eis = Files.newInputStream(e.file);
else
throw new ZipException("update entry data is missing");
} else if (e.type == Entry.FILECH) {
// FILECH result is un-compressed.
eis = Files.newInputStream(e.file);
*** 1577,1587 ****
}
if (len > rem) {
len = (int) rem;
}
// readFullyAt()
! long n = 0;
ByteBuffer bb = ByteBuffer.wrap(b);
bb.position(off);
bb.limit(off + len);
synchronized(zfch) {
n = zfch.position(pos).read(bb);
--- 1660,1670 ----
}
if (len > rem) {
len = (int) rem;
}
// readFullyAt()
! long n;
ByteBuffer bb = ByteBuffer.wrap(b);
bb.position(off);
bb.limit(off + len);
synchronized(zfch) {
n = zfch.position(pos).read(bb);
*** 1903,1913 ****
Entry(byte[] name, int type, boolean isdir, int method) {
this(name, isdir, method);
this.type = type;
}
! Entry (Entry e, int type) {
name(e.name);
this.isdir = e.isdir;
this.version = e.version;
this.ctime = e.ctime;
this.atime = e.atime;
--- 1986,1996 ----
Entry(byte[] name, int type, boolean isdir, int method) {
this(name, isdir, method);
this.type = type;
}
! Entry(Entry e, int type) {
name(e.name);
this.isdir = e.isdir;
this.version = e.version;
this.ctime = e.ctime;
this.atime = e.atime;
*** 1926,1936 ****
this.locoff = e.locoff;
this.comment = e.comment;
this.type = type;
}
! Entry (byte[] name, Path file, int type) {
this(name, type, false, METHOD_STORED);
this.file = file;
}
int version() throws ZipException {
--- 2009,2019 ----
this.locoff = e.locoff;
this.comment = e.comment;
this.type = type;
}
! Entry(byte[] name, Path file, int type) {
this(name, type, false, METHOD_STORED);
this.file = file;
}
int version() throws ZipException {
*** 2422,2431 ****
--- 2505,2528 ----
fm.close();
return sb.toString();
}
}
+ private static class ExChannelCloser {
+ Path path;
+ SeekableByteChannel ch;
+ Set<InputStream> streams;
+ ExChannelCloser(Path path,
+ SeekableByteChannel ch,
+ Set<InputStream> streams)
+ {
+ this.path = path;
+ this.ch = ch;
+ this.streams = streams;
+ }
+ }
+
// ZIP directory has two issues:
// (1) ZIP spec does not require the ZIP file to include
// directory entry
// (2) all entries are not stored/organized in a "tree"
// structure.
< prev index next >